| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- """
- 结构化日志记录器
- 提供步骤化、可追溯、易于可视化的日志记录功能
- """
- import json
- import os
- from datetime import datetime
- from typing import Any, Optional
- from pathlib import Path
- class StructuredLogger:
- """
- 结构化日志记录器
- 特点:
- 1. 每个步骤独立保存文件
- 2. 记录完整的时间线
- 3. 支持嵌套步骤(树形结构)
- 4. 便于可视化和debug
- """
- def __init__(self, log_dir: str, run_id: str):
- """
- 初始化日志记录器
- Args:
- log_dir: 日志根目录
- run_id: 本次运行的唯一标识
- """
- self.log_dir = Path(log_dir)
- self.run_id = run_id
- # 创建目录结构
- self.steps_dir = self.log_dir / "steps"
- self.timeline_dir = self.log_dir / "timeline"
- self.artifacts_dir = self.log_dir / "artifacts"
- for dir_path in [self.steps_dir, self.timeline_dir, self.artifacts_dir]:
- dir_path.mkdir(parents=True, exist_ok=True)
- # 时间线记录
- self.timeline = []
- self.step_counter = 0
- self.step_stack = [] # 用于嵌套步骤
- # 初始化元数据
- self.metadata = {
- "run_id": run_id,
- "start_time": datetime.now().isoformat(),
- "status": "running",
- "steps_count": 0,
- "log_dir": str(self.log_dir),
- }
- self._save_metadata()
- def start_step(
- self,
- step_name: str,
- step_type: str,
- description: str = "",
- input_data: Any = None
- ) -> int:
- """
- 开始一个新步骤
- Args:
- step_name: 步骤名称(如:"extract_keywords", "explore_level_1")
- step_type: 步骤类型(如:"extraction", "exploration", "analysis", "evaluation")
- description: 步骤描述
- input_data: 输入数据
- Returns:
- step_id: 步骤ID
- """
- self.step_counter += 1
- step_id = self.step_counter
- # 计算层级(基于栈深度)
- level = len(self.step_stack)
- parent_id = self.step_stack[-1] if self.step_stack else None
- step_info = {
- "step_id": step_id,
- "step_name": step_name,
- "step_type": step_type,
- "description": description,
- "level": level,
- "parent_id": parent_id,
- "status": "running",
- "start_time": datetime.now().isoformat(),
- "end_time": None,
- "duration_seconds": None,
- "input": self._serialize(input_data),
- "output": None,
- "error": None,
- }
- # 压入栈
- self.step_stack.append(step_id)
- # 保存步骤开始信息
- self._save_step(step_id, step_info)
- # 添加到时间线
- self.timeline.append({
- "timestamp": step_info["start_time"],
- "event": "step_start",
- "step_id": step_id,
- "step_name": step_name,
- "step_type": step_type,
- })
- self._save_timeline()
- print(f"\n{' ' * level}[Step {step_id}] {step_name} - {description}")
- return step_id
- def end_step(
- self,
- step_id: int,
- output_data: Any = None,
- status: str = "success",
- error: Optional[str] = None
- ):
- """
- 结束一个步骤
- Args:
- step_id: 步骤ID
- output_data: 输出数据
- status: 步骤状态("success", "error", "skipped")
- error: 错误信息(如果有)
- """
- # 从栈中弹出
- if self.step_stack and self.step_stack[-1] == step_id:
- self.step_stack.pop()
- # 读取步骤信息
- step_info = self._load_step(step_id)
- # 更新步骤信息
- end_time = datetime.now()
- start_time = datetime.fromisoformat(step_info["start_time"])
- duration = (end_time - start_time).total_seconds()
- step_info.update({
- "status": status,
- "end_time": end_time.isoformat(),
- "duration_seconds": duration,
- "output": self._serialize(output_data),
- "error": error,
- })
- # 保存步骤结束信息
- self._save_step(step_id, step_info)
- # 添加到时间线
- self.timeline.append({
- "timestamp": step_info["end_time"],
- "event": "step_end",
- "step_id": step_id,
- "step_name": step_info["step_name"],
- "status": status,
- "duration_seconds": duration,
- })
- self._save_timeline()
- level = len(self.step_stack)
- status_emoji = "✅" if status == "success" else "❌" if status == "error" else "⏭️"
- print(f"{' ' * level}{status_emoji} [Step {step_id}] Completed in {duration:.2f}s")
- def log_artifact(
- self,
- step_id: int,
- artifact_name: str,
- artifact_data: Any,
- artifact_type: str = "json"
- ) -> str:
- """
- 保存步骤的关联产物(如:API响应、中间结果等)
- Args:
- step_id: 步骤ID
- artifact_name: 产物名称
- artifact_data: 产物数据
- artifact_type: 产物类型("json", "text", "image"等)
- Returns:
- artifact_path: 产物文件路径
- """
- artifact_dir = self.artifacts_dir / f"step_{step_id:04d}"
- artifact_dir.mkdir(exist_ok=True)
- if artifact_type == "json":
- artifact_path = artifact_dir / f"{artifact_name}.json"
- with open(artifact_path, "w", encoding="utf-8") as f:
- json.dump(artifact_data, f, ensure_ascii=False, indent=2)
- elif artifact_type == "text":
- artifact_path = artifact_dir / f"{artifact_name}.txt"
- with open(artifact_path, "w", encoding="utf-8") as f:
- f.write(str(artifact_data))
- else:
- artifact_path = artifact_dir / artifact_name
- with open(artifact_path, "wb") as f:
- f.write(artifact_data)
- print(f" 📎 Artifact saved: {artifact_path.name}")
- return str(artifact_path)
- def finalize(self, final_status: str = "success", final_output: Any = None):
- """
- 完成整个运行,生成最终摘要
- Args:
- final_status: 最终状态
- final_output: 最终输出
- """
- self.metadata.update({
- "end_time": datetime.now().isoformat(),
- "status": final_status,
- "steps_count": self.step_counter,
- "final_output": self._serialize(final_output),
- })
- self._save_metadata()
- # 生成摘要
- self._generate_summary()
- print(f"\n{'='*60}")
- print(f"Run completed: {final_status}")
- print(f"Total steps: {self.step_counter}")
- print(f"Log directory: {self.log_dir}")
- print(f"{'='*60}")
- def _save_step(self, step_id: int, step_info: dict):
- """保存步骤信息"""
- step_file = self.steps_dir / f"step_{step_id:04d}.json"
- with open(step_file, "w", encoding="utf-8") as f:
- json.dump(step_info, f, ensure_ascii=False, indent=2)
- def _load_step(self, step_id: int) -> dict:
- """加载步骤信息"""
- step_file = self.steps_dir / f"step_{step_id:04d}.json"
- with open(step_file, "r", encoding="utf-8") as f:
- return json.load(f)
- def _save_timeline(self):
- """保存时间线"""
- timeline_file = self.timeline_dir / "timeline.json"
- with open(timeline_file, "w", encoding="utf-8") as f:
- json.dump(self.timeline, f, ensure_ascii=False, indent=2)
- def _save_metadata(self):
- """保存元数据"""
- metadata_file = self.log_dir / "metadata.json"
- with open(metadata_file, "w", encoding="utf-8") as f:
- json.dump(self.metadata, f, ensure_ascii=False, indent=2)
- def _serialize(self, data: Any) -> Any:
- """序列化数据(处理Pydantic模型等)"""
- if data is None:
- return None
- # 处理Pydantic模型
- if hasattr(data, "model_dump"):
- return data.model_dump()
- # 处理字典
- if isinstance(data, dict):
- return {k: self._serialize(v) for k, v in data.items()}
- # 处理列表
- if isinstance(data, list):
- return [self._serialize(item) for item in data]
- # 其他类型直接返回
- return data
- def _generate_summary(self):
- """生成运行摘要"""
- summary = {
- "run_id": self.run_id,
- "status": self.metadata["status"],
- "start_time": self.metadata["start_time"],
- "end_time": self.metadata["end_time"],
- "total_steps": self.step_counter,
- "steps_overview": [],
- }
- # 汇总所有步骤
- for step_id in range(1, self.step_counter + 1):
- step_info = self._load_step(step_id)
- summary["steps_overview"].append({
- "step_id": step_id,
- "step_name": step_info["step_name"],
- "step_type": step_info["step_type"],
- "status": step_info["status"],
- "duration_seconds": step_info["duration_seconds"],
- })
- # 保存摘要
- summary_file = self.log_dir / "summary.json"
- with open(summary_file, "w", encoding="utf-8") as f:
- json.dump(summary, f, ensure_ascii=False, indent=2)
|