| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- """
- Tee 日志捕获工具
- 支持多 Agent 并发执行:
- - 每个 Agent 通过 build_log(build_id) 注册自己的日志 buffer
- - log() 函数根据 contextvars 自动路由到当前 Agent 的 buffer
- - 同时输出到真实 stdout,不劫持 sys.stdout
- """
- import io
- import sys
- import contextvars
- import threading
- from pathlib import Path
- from typing import TextIO, Union
- from contextlib import contextmanager
- # 当前 Agent 执行绑定的 build_id(通过 contextvars 跨 asyncio.to_thread 传播)
- # 兼容历史:可能是 int,也可能是 uuid 字符串
- BuildId = Union[int, str]
- _current_build_id: contextvars.ContextVar[BuildId | None] = contextvars.ContextVar(
- "log_build_id", default=None
- )
- # build_id → StringIO buffer 的全局注册表(线程安全)
- _buffers: dict[BuildId, io.StringIO] = {}
- _buffers_lock = threading.Lock()
- # build_id → 追加写入的文件句柄(线程安全)
- _file_handles: dict[BuildId, TextIO] = {}
- _files_lock = threading.Lock()
- # 保存真实 stdout(进程启动时的值,不会被覆盖)
- _real_stdout = sys.stdout
- def attach_log_file(build_id: BuildId, file_path: str | Path) -> None:
- """
- 绑定实时落盘文件:后续 log() 会在写 stdout/buffer 的同时 append 到该文件。
- - 会自动创建父目录
- - 会以 utf-8 追加模式打开
- - 重复绑定同一个 build_id 会关闭旧句柄并替换
- """
- path = Path(file_path).expanduser()
- path.parent.mkdir(parents=True, exist_ok=True)
- fh = path.open("a", encoding="utf-8")
- with _files_lock:
- old = _file_handles.pop(build_id, None)
- if old is not None:
- try:
- old.close()
- except Exception:
- pass
- _file_handles[build_id] = fh
- def detach_log_file(build_id: BuildId) -> None:
- """解除绑定并关闭文件句柄(若存在)。"""
- with _files_lock:
- fh = _file_handles.pop(build_id, None)
- if fh is not None:
- try:
- fh.close()
- except Exception:
- pass
- def log(*args, **kwargs):
- """并发安全的日志函数,替代 print()。
- 同时输出到 stdout 和当前 Agent 的日志 buffer。
- 如果不在 Agent 上下文中,等同于普通 print()。
- """
- # 1. 始终输出到真实 stdout
- print(*args, file=_real_stdout, **kwargs)
- # 2. 如果在 Agent 上下文中,额外写入 buffer
- build_id = _current_build_id.get()
- if build_id is not None:
- buf = _buffers.get(build_id)
- if buf is not None:
- print(*args, file=buf, **kwargs)
- fh = _file_handles.get(build_id)
- if fh is not None:
- try:
- print(*args, file=fh, **kwargs)
- fh.flush()
- except Exception:
- # 文件写入失败不应影响主流程
- pass
- @contextmanager
- def build_log(build_id: BuildId):
- """Agent 执行的日志上下文管理器。
- 使用方式:
- with build_log(build_id):
- log("这条会写入 buffer")
- ...
- # with 结束后仅清理内存缓冲区
- """
- buf = io.StringIO()
- token = _current_build_id.set(build_id)
- with _buffers_lock:
- _buffers[build_id] = buf
- try:
- yield buf
- finally:
- # 清理
- with _buffers_lock:
- _buffers.pop(build_id, None)
- detach_log_file(build_id)
- _current_build_id.reset(token)
- buf.close()
- @contextmanager
- def log_fold(label: str):
- """可折叠日志块的上下文管理器"""
- log(f"[FOLD:{label}]")
- try:
- yield
- finally:
- log("[/FOLD]")
- def get_log_content(build_id: BuildId) -> str | None:
- """获取指定 build 当前已收集的日志内容(用于实时查看)"""
- buf = _buffers.get(build_id)
- return buf.getvalue() if buf else None
- def _save_to_db(build_id: int, content: str) -> bool:
- """兼容旧接口:已禁用 DB 落库。"""
- return False
- # ============================================================================
- # 兼容旧接口 — TeeStream(仅供单线程场景使用,如 run_build_topic_agent.py)
- # ============================================================================
- class TeeStream(io.TextIOBase):
- """Tee 模式的输出流:同时写入原始 stdout 和内部缓冲区
- ⚠️ 仅供单进程单 Agent 使用(如命令行运行),并发场景请使用 build_log()。
- """
- def __init__(self, original_stdout):
- super().__init__()
- self.original_stdout = original_stdout
- self._buffer = io.StringIO()
- def write(self, s):
- if s:
- self.original_stdout.write(s)
- self._buffer.write(s)
- return len(s) if s else 0
- def flush(self):
- self.original_stdout.flush()
- self._buffer.flush()
- def get_log(self) -> str:
- return self._buffer.getvalue()
- def save_to_db(self, build_id: int) -> bool:
- return False
- @property
- def encoding(self):
- return self.original_stdout.encoding
- def isatty(self):
- return False
- def readable(self):
- return False
- def writable(self):
- return True
|