""" Tee 日志捕获工具 支持多 Agent 并发执行: - 每个 Agent 通过 build_log(build_id) 注册自己的日志 buffer - log() 函数根据 contextvars 自动路由到当前 Agent 的 buffer - 同时输出到真实 stdout,不劫持 sys.stdout """ import io import sys import contextvars import threading from pathlib import Path from typing import TextIO, Union from contextlib import contextmanager # 当前 Agent 执行绑定的 build_id(通过 contextvars 跨 asyncio.to_thread 传播) # 兼容历史:可能是 int,也可能是 uuid 字符串 BuildId = Union[int, str] _current_build_id: contextvars.ContextVar[BuildId | None] = contextvars.ContextVar( "log_build_id", default=None ) # build_id → StringIO buffer 的全局注册表(线程安全) _buffers: dict[BuildId, io.StringIO] = {} _buffers_lock = threading.Lock() # build_id → 追加写入的文件句柄(线程安全) _file_handles: dict[BuildId, TextIO] = {} _files_lock = threading.Lock() # 保存真实 stdout(进程启动时的值,不会被覆盖) _real_stdout = sys.stdout def attach_log_file(build_id: BuildId, file_path: str | Path) -> None: """ 绑定实时落盘文件:后续 log() 会在写 stdout/buffer 的同时 append 到该文件。 - 会自动创建父目录 - 会以 utf-8 追加模式打开 - 重复绑定同一个 build_id 会关闭旧句柄并替换 """ path = Path(file_path).expanduser() path.parent.mkdir(parents=True, exist_ok=True) fh = path.open("a", encoding="utf-8") with _files_lock: old = _file_handles.pop(build_id, None) if old is not None: try: old.close() except Exception: pass _file_handles[build_id] = fh def detach_log_file(build_id: BuildId) -> None: """解除绑定并关闭文件句柄(若存在)。""" with _files_lock: fh = _file_handles.pop(build_id, None) if fh is not None: try: fh.close() except Exception: pass def log(*args, **kwargs): """并发安全的日志函数,替代 print()。 同时输出到 stdout 和当前 Agent 的日志 buffer。 如果不在 Agent 上下文中,等同于普通 print()。 """ # 1. 始终输出到真实 stdout print(*args, file=_real_stdout, **kwargs) # 2. 如果在 Agent 上下文中,额外写入 buffer build_id = _current_build_id.get() if build_id is not None: buf = _buffers.get(build_id) if buf is not None: print(*args, file=buf, **kwargs) fh = _file_handles.get(build_id) if fh is not None: try: print(*args, file=fh, **kwargs) fh.flush() except Exception: # 文件写入失败不应影响主流程 pass @contextmanager def build_log(build_id: BuildId): """Agent 执行的日志上下文管理器。 使用方式: with build_log(build_id): log("这条会写入 buffer") ... # with 结束后仅清理内存缓冲区 """ buf = io.StringIO() token = _current_build_id.set(build_id) with _buffers_lock: _buffers[build_id] = buf try: yield buf finally: # 清理 with _buffers_lock: _buffers.pop(build_id, None) detach_log_file(build_id) _current_build_id.reset(token) buf.close() @contextmanager def log_fold(label: str): """可折叠日志块的上下文管理器""" log(f"[FOLD:{label}]") try: yield finally: log("[/FOLD]") def get_log_content(build_id: BuildId) -> str | None: """获取指定 build 当前已收集的日志内容(用于实时查看)""" buf = _buffers.get(build_id) return buf.getvalue() if buf else None def _save_to_db(build_id: int, content: str) -> bool: """兼容旧接口:已禁用 DB 落库。""" return False # ============================================================================ # 兼容旧接口 — TeeStream(仅供单线程场景使用,如 run_build_topic_agent.py) # ============================================================================ class TeeStream(io.TextIOBase): """Tee 模式的输出流:同时写入原始 stdout 和内部缓冲区 ⚠️ 仅供单进程单 Agent 使用(如命令行运行),并发场景请使用 build_log()。 """ def __init__(self, original_stdout): super().__init__() self.original_stdout = original_stdout self._buffer = io.StringIO() def write(self, s): if s: self.original_stdout.write(s) self._buffer.write(s) return len(s) if s else 0 def flush(self): self.original_stdout.flush() self._buffer.flush() def get_log(self) -> str: return self._buffer.getvalue() def save_to_db(self, build_id: int) -> bool: return False @property def encoding(self): return self.original_stdout.encoding def isatty(self): return False def readable(self): return False def writable(self): return True