| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- from __future__ import annotations
- import asyncio
- import logging
- import os
- import shutil
- import sys
- import tempfile
- from dotenv import load_dotenv
- from src.pipeline.runner import run_content_finder_from_cli
- load_dotenv()
- # ── 日志级别由环境变量控制,默认 DEBUG 全量捕获 ────────────
- _LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper()
- _CONSOLE_LEVEL = os.getenv("CONSOLE_LOG_LEVEL", "INFO").upper()
- _LOG_FMT = "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s"
- _LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
- def _setup_logging(log_file_path: str) -> logging.FileHandler:
- """
- 配置双通道日志:console(INFO)+ file(DEBUG)。
- 不修改 agent 内核代码,通过 root logger 拦截所有子 logger 输出。
- """
- root = logging.getLogger()
- root.setLevel(getattr(logging, _LOG_LEVEL, logging.DEBUG))
- formatter = logging.Formatter(fmt=_LOG_FMT, datefmt=_LOG_DATEFMT)
- console = logging.StreamHandler(sys.__stdout__)
- console.setLevel(getattr(logging, _CONSOLE_LEVEL, logging.INFO))
- console.setFormatter(formatter)
- root.addHandler(console)
- fh = logging.FileHandler(log_file_path, mode="w", encoding="utf-8")
- fh.setLevel(logging.DEBUG)
- fh.setFormatter(formatter)
- root.addHandler(fh)
- for noisy in ("httpx", "httpcore", "urllib3", "asyncio"):
- logging.getLogger(noisy).setLevel(logging.WARNING)
- # agent 内核日志不写入全量日志文件(减少噪音)
- class _AgentLogFilter(logging.Filter):
- def filter(self, record: logging.LogRecord) -> bool:
- return not record.name.startswith("agent.")
- fh.addFilter(_AgentLogFilter())
- return fh
- logger = logging.getLogger(__name__)
- async def main() -> None:
- tmp = tempfile.NamedTemporaryFile(
- delete=False, suffix=".log", prefix="pipeline_run_", mode="w", encoding="utf-8",
- )
- tmp_path = tmp.name
- tmp.close()
- file_handler = _setup_logging(tmp_path)
- try:
- query = os.getenv("PIPELINE_QUERY", "伊朗、以色列、和平是永恒的主题")
- demand_id = os.getenv("PIPELINE_DEMAND_ID", "1")
- result = await run_content_finder_from_cli(query=query, demand_id=demand_id)
- logger.info("pipeline trace_id=%s", result.trace_id)
- logger.info("pipeline output=%s", result.metadata.get("output_file", ""))
- # 将日志文件移入 trace 目录
- file_handler.close()
- trace_dir = os.path.join("tests", "traces", result.trace_id)
- os.makedirs(trace_dir, exist_ok=True)
- dest = os.path.join(trace_dir, "full_log.log")
- shutil.move(tmp_path, dest)
- logger.info("完整日志已保存: %s", dest)
- finally:
- if os.path.exists(tmp_path):
- try:
- os.unlink(tmp_path)
- except OSError:
- pass
- if __name__ == "__main__":
- asyncio.run(main())
|