""" 结构化日志记录器 提供步骤化、可追溯、易于可视化的日志记录功能 """ import json import os from datetime import datetime from typing import Any, Optional from pathlib import Path class StructuredLogger: """ 结构化日志记录器 特点: 1. 每个步骤独立保存文件 2. 记录完整的时间线 3. 支持嵌套步骤(树形结构) 4. 便于可视化和debug """ def __init__(self, log_dir: str, run_id: str): """ 初始化日志记录器 Args: log_dir: 日志根目录 run_id: 本次运行的唯一标识 """ self.log_dir = Path(log_dir) self.run_id = run_id # 创建目录结构 self.steps_dir = self.log_dir / "steps" self.timeline_dir = self.log_dir / "timeline" self.artifacts_dir = self.log_dir / "artifacts" for dir_path in [self.steps_dir, self.timeline_dir, self.artifacts_dir]: dir_path.mkdir(parents=True, exist_ok=True) # 时间线记录 self.timeline = [] self.step_counter = 0 self.step_stack = [] # 用于嵌套步骤 # 初始化元数据 self.metadata = { "run_id": run_id, "start_time": datetime.now().isoformat(), "status": "running", "steps_count": 0, "log_dir": str(self.log_dir), } self._save_metadata() def start_step( self, step_name: str, step_type: str, description: str = "", input_data: Any = None ) -> int: """ 开始一个新步骤 Args: step_name: 步骤名称(如:"extract_keywords", "explore_level_1") step_type: 步骤类型(如:"extraction", "exploration", "analysis", "evaluation") description: 步骤描述 input_data: 输入数据 Returns: step_id: 步骤ID """ self.step_counter += 1 step_id = self.step_counter # 计算层级(基于栈深度) level = len(self.step_stack) parent_id = self.step_stack[-1] if self.step_stack else None step_info = { "step_id": step_id, "step_name": step_name, "step_type": step_type, "description": description, "level": level, "parent_id": parent_id, "status": "running", "start_time": datetime.now().isoformat(), "end_time": None, "duration_seconds": None, "input": self._serialize(input_data), "output": None, "error": None, } # 压入栈 self.step_stack.append(step_id) # 保存步骤开始信息 self._save_step(step_id, step_info) # 添加到时间线 self.timeline.append({ "timestamp": step_info["start_time"], "event": "step_start", "step_id": step_id, "step_name": step_name, "step_type": step_type, }) self._save_timeline() print(f"\n{' ' * level}[Step {step_id}] {step_name} - {description}") return step_id def end_step( self, step_id: int, output_data: Any = None, status: str = "success", error: Optional[str] = None ): """ 结束一个步骤 Args: step_id: 步骤ID output_data: 输出数据 status: 步骤状态("success", "error", "skipped") error: 错误信息(如果有) """ # 从栈中弹出 if self.step_stack and self.step_stack[-1] == step_id: self.step_stack.pop() # 读取步骤信息 step_info = self._load_step(step_id) # 更新步骤信息 end_time = datetime.now() start_time = datetime.fromisoformat(step_info["start_time"]) duration = (end_time - start_time).total_seconds() step_info.update({ "status": status, "end_time": end_time.isoformat(), "duration_seconds": duration, "output": self._serialize(output_data), "error": error, }) # 保存步骤结束信息 self._save_step(step_id, step_info) # 添加到时间线 self.timeline.append({ "timestamp": step_info["end_time"], "event": "step_end", "step_id": step_id, "step_name": step_info["step_name"], "status": status, "duration_seconds": duration, }) self._save_timeline() level = len(self.step_stack) status_emoji = "✅" if status == "success" else "❌" if status == "error" else "⏭️" print(f"{' ' * level}{status_emoji} [Step {step_id}] Completed in {duration:.2f}s") def log_artifact( self, step_id: int, artifact_name: str, artifact_data: Any, artifact_type: str = "json" ) -> str: """ 保存步骤的关联产物(如:API响应、中间结果等) Args: step_id: 步骤ID artifact_name: 产物名称 artifact_data: 产物数据 artifact_type: 产物类型("json", "text", "image"等) Returns: artifact_path: 产物文件路径 """ artifact_dir = self.artifacts_dir / f"step_{step_id:04d}" artifact_dir.mkdir(exist_ok=True) if artifact_type == "json": artifact_path = artifact_dir / f"{artifact_name}.json" with open(artifact_path, "w", encoding="utf-8") as f: json.dump(artifact_data, f, ensure_ascii=False, indent=2) elif artifact_type == "text": artifact_path = artifact_dir / f"{artifact_name}.txt" with open(artifact_path, "w", encoding="utf-8") as f: f.write(str(artifact_data)) else: artifact_path = artifact_dir / artifact_name with open(artifact_path, "wb") as f: f.write(artifact_data) print(f" 📎 Artifact saved: {artifact_path.name}") return str(artifact_path) def finalize(self, final_status: str = "success", final_output: Any = None): """ 完成整个运行,生成最终摘要 Args: final_status: 最终状态 final_output: 最终输出 """ self.metadata.update({ "end_time": datetime.now().isoformat(), "status": final_status, "steps_count": self.step_counter, "final_output": self._serialize(final_output), }) self._save_metadata() # 生成摘要 self._generate_summary() print(f"\n{'='*60}") print(f"Run completed: {final_status}") print(f"Total steps: {self.step_counter}") print(f"Log directory: {self.log_dir}") print(f"{'='*60}") def _save_step(self, step_id: int, step_info: dict): """保存步骤信息""" step_file = self.steps_dir / f"step_{step_id:04d}.json" with open(step_file, "w", encoding="utf-8") as f: json.dump(step_info, f, ensure_ascii=False, indent=2) def _load_step(self, step_id: int) -> dict: """加载步骤信息""" step_file = self.steps_dir / f"step_{step_id:04d}.json" with open(step_file, "r", encoding="utf-8") as f: return json.load(f) def _save_timeline(self): """保存时间线""" timeline_file = self.timeline_dir / "timeline.json" with open(timeline_file, "w", encoding="utf-8") as f: json.dump(self.timeline, f, ensure_ascii=False, indent=2) def _save_metadata(self): """保存元数据""" metadata_file = self.log_dir / "metadata.json" with open(metadata_file, "w", encoding="utf-8") as f: json.dump(self.metadata, f, ensure_ascii=False, indent=2) def _serialize(self, data: Any) -> Any: """序列化数据(处理Pydantic模型等)""" if data is None: return None # 处理Pydantic模型 if hasattr(data, "model_dump"): return data.model_dump() # 处理字典 if isinstance(data, dict): return {k: self._serialize(v) for k, v in data.items()} # 处理列表 if isinstance(data, list): return [self._serialize(item) for item in data] # 其他类型直接返回 return data def _generate_summary(self): """生成运行摘要""" summary = { "run_id": self.run_id, "status": self.metadata["status"], "start_time": self.metadata["start_time"], "end_time": self.metadata["end_time"], "total_steps": self.step_counter, "steps_overview": [], } # 汇总所有步骤 for step_id in range(1, self.step_counter + 1): step_info = self._load_step(step_id) summary["steps_overview"].append({ "step_id": step_id, "step_name": step_info["step_name"], "step_type": step_info["step_type"], "status": step_info["status"], "duration_seconds": step_info["duration_seconds"], }) # 保存摘要 summary_file = self.log_dir / "summary.json" with open(summary_file, "w", encoding="utf-8") as f: json.dump(summary, f, ensure_ascii=False, indent=2)