| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- """
- Tee 日志捕获工具
- 支持多 Agent 并发执行:
- - 每个 Agent 通过 build_log(build_id) 注册自己的日志 buffer
- - log() 函数根据 contextvars 自动路由到当前 Agent 的 buffer
- - 同时输出到真实 stdout,不劫持 sys.stdout
- """
- import io
- import sys
- import contextvars
- import threading
- from contextlib import contextmanager
- # 当前 Agent 执行绑定的 build_id(通过 contextvars 跨 asyncio.to_thread 传播)
- _current_build_id: contextvars.ContextVar[int | None] = contextvars.ContextVar(
- 'log_build_id', default=None
- )
- # build_id → StringIO buffer 的全局注册表(线程安全)
- _buffers: dict[int, io.StringIO] = {}
- _buffers_lock = threading.Lock()
- # 保存真实 stdout(进程启动时的值,不会被覆盖)
- _real_stdout = sys.stdout
- def log(*args, **kwargs):
- """并发安全的日志函数,替代 print()。
- 同时输出到 stdout 和当前 Agent 的日志 buffer。
- 如果不在 Agent 上下文中,等同于普通 print()。
- """
- # 1. 始终输出到真实 stdout
- print(*args, file=_real_stdout, **kwargs)
- # 2. 如果在 Agent 上下文中,额外写入 buffer
- build_id = _current_build_id.get()
- if build_id is not None:
- buf = _buffers.get(build_id)
- if buf is not None:
- print(*args, file=buf, **kwargs)
- @contextmanager
- def build_log(build_id: int):
- """Agent 执行的日志上下文管理器。
- 使用方式:
- with build_log(build_id):
- log("这条会写入 buffer")
- ...
- # with 结束后仅清理内存缓冲区
- """
- buf = io.StringIO()
- token = _current_build_id.set(build_id)
- with _buffers_lock:
- _buffers[build_id] = buf
- try:
- yield buf
- finally:
- # 清理
- with _buffers_lock:
- _buffers.pop(build_id, None)
- _current_build_id.reset(token)
- buf.close()
- @contextmanager
- def log_fold(label: str):
- """可折叠日志块的上下文管理器"""
- log(f"[FOLD:{label}]")
- try:
- yield
- finally:
- log("[/FOLD]")
- def get_log_content(build_id: int) -> str | None:
- """获取指定 build 当前已收集的日志内容(用于实时查看)"""
- buf = _buffers.get(build_id)
- return buf.getvalue() if buf else None
- def _save_to_db(build_id: int, content: str) -> bool:
- """兼容旧接口:已禁用 DB 落库。"""
- return False
- # ============================================================================
- # 兼容旧接口 — TeeStream(仅供单线程场景使用,如 run_build_topic_agent.py)
- # ============================================================================
- class TeeStream(io.TextIOBase):
- """Tee 模式的输出流:同时写入原始 stdout 和内部缓冲区
- ⚠️ 仅供单进程单 Agent 使用(如命令行运行),并发场景请使用 build_log()。
- """
- def __init__(self, original_stdout):
- super().__init__()
- self.original_stdout = original_stdout
- self._buffer = io.StringIO()
- def write(self, s):
- if s:
- self.original_stdout.write(s)
- self._buffer.write(s)
- return len(s) if s else 0
- def flush(self):
- self.original_stdout.flush()
- self._buffer.flush()
- def get_log(self) -> str:
- return self._buffer.getvalue()
- def save_to_db(self, build_id: int) -> bool:
- return False
- @property
- def encoding(self):
- return self.original_stdout.encoding
- def isatty(self):
- return False
- def readable(self):
- return False
- def writable(self):
- return True
|