structured_logger.py 9.4 KB


  1. """
  2. 结构化日志记录器
  3. 提供步骤化、可追溯、易于可视化的日志记录功能
  4. """
  5. import json
  6. import os
  7. from datetime import datetime
  8. from typing import Any, Optional
  9. from pathlib import Path
  10. class StructuredLogger:
  11. """
  12. 结构化日志记录器
  13. 特点:
  14. 1. 每个步骤独立保存文件
  15. 2. 记录完整的时间线
  16. 3. 支持嵌套步骤(树形结构)
  17. 4. 便于可视化和debug
  18. """
  19. def __init__(self, log_dir: str, run_id: str):
  20. """
  21. 初始化日志记录器
  22. Args:
  23. log_dir: 日志根目录
  24. run_id: 本次运行的唯一标识
  25. """
  26. self.log_dir = Path(log_dir)
  27. self.run_id = run_id
  28. # 创建目录结构
  29. self.steps_dir = self.log_dir / "steps"
  30. self.timeline_dir = self.log_dir / "timeline"
  31. self.artifacts_dir = self.log_dir / "artifacts"
  32. for dir_path in [self.steps_dir, self.timeline_dir, self.artifacts_dir]:
  33. dir_path.mkdir(parents=True, exist_ok=True)
  34. # 时间线记录
  35. self.timeline = []
  36. self.step_counter = 0
  37. self.step_stack = [] # 用于嵌套步骤
  38. # 初始化元数据
  39. self.metadata = {
  40. "run_id": run_id,
  41. "start_time": datetime.now().isoformat(),
  42. "status": "running",
  43. "steps_count": 0,
  44. "log_dir": str(self.log_dir),
  45. }
  46. self._save_metadata()
  47. def start_step(
  48. self,
  49. step_name: str,
  50. step_type: str,
  51. description: str = "",
  52. input_data: Any = None
  53. ) -> int:
  54. """
  55. 开始一个新步骤
  56. Args:
  57. step_name: 步骤名称(如:"extract_keywords", "explore_level_1")
  58. step_type: 步骤类型(如:"extraction", "exploration", "analysis", "evaluation")
  59. description: 步骤描述
  60. input_data: 输入数据
  61. Returns:
  62. step_id: 步骤ID
  63. """
  64. self.step_counter += 1
  65. step_id = self.step_counter
  66. # 计算层级(基于栈深度)
  67. level = len(self.step_stack)
  68. parent_id = self.step_stack[-1] if self.step_stack else None
  69. step_info = {
  70. "step_id": step_id,
  71. "step_name": step_name,
  72. "step_type": step_type,
  73. "description": description,
  74. "level": level,
  75. "parent_id": parent_id,
  76. "status": "running",
  77. "start_time": datetime.now().isoformat(),
  78. "end_time": None,
  79. "duration_seconds": None,
  80. "input": self._serialize(input_data),
  81. "output": None,
  82. "error": None,
  83. }
  84. # 压入栈
  85. self.step_stack.append(step_id)
  86. # 保存步骤开始信息
  87. self._save_step(step_id, step_info)
  88. # 添加到时间线
  89. self.timeline.append({
  90. "timestamp": step_info["start_time"],
  91. "event": "step_start",
  92. "step_id": step_id,
  93. "step_name": step_name,
  94. "step_type": step_type,
  95. })
  96. self._save_timeline()
  97. print(f"\n{' ' * level}[Step {step_id}] {step_name} - {description}")
  98. return step_id
  99. def end_step(
  100. self,
  101. step_id: int,
  102. output_data: Any = None,
  103. status: str = "success",
  104. error: Optional[str] = None
  105. ):
  106. """
  107. 结束一个步骤
  108. Args:
  109. step_id: 步骤ID
  110. output_data: 输出数据
  111. status: 步骤状态("success", "error", "skipped")
  112. error: 错误信息(如果有)
  113. """
  114. # 从栈中弹出
  115. if self.step_stack and self.step_stack[-1] == step_id:
  116. self.step_stack.pop()
  117. # 读取步骤信息
  118. step_info = self._load_step(step_id)
  119. # 更新步骤信息
  120. end_time = datetime.now()
  121. start_time = datetime.fromisoformat(step_info["start_time"])
  122. duration = (end_time - start_time).total_seconds()
  123. step_info.update({
  124. "status": status,
  125. "end_time": end_time.isoformat(),
  126. "duration_seconds": duration,
  127. "output": self._serialize(output_data),
  128. "error": error,
  129. })
  130. # 保存步骤结束信息
  131. self._save_step(step_id, step_info)
  132. # 添加到时间线
  133. self.timeline.append({
  134. "timestamp": step_info["end_time"],
  135. "event": "step_end",
  136. "step_id": step_id,
  137. "step_name": step_info["step_name"],
  138. "status": status,
  139. "duration_seconds": duration,
  140. })
  141. self._save_timeline()
  142. level = len(self.step_stack)
  143. status_emoji = "✅" if status == "success" else "❌" if status == "error" else "⏭️"
  144. print(f"{' ' * level}{status_emoji} [Step {step_id}] Completed in {duration:.2f}s")
  145. def log_artifact(
  146. self,
  147. step_id: int,
  148. artifact_name: str,
  149. artifact_data: Any,
  150. artifact_type: str = "json"
  151. ) -> str:
  152. """
  153. 保存步骤的关联产物(如:API响应、中间结果等)
  154. Args:
  155. step_id: 步骤ID
  156. artifact_name: 产物名称
  157. artifact_data: 产物数据
  158. artifact_type: 产物类型("json", "text", "image"等)
  159. Returns:
  160. artifact_path: 产物文件路径
  161. """
  162. artifact_dir = self.artifacts_dir / f"step_{step_id:04d}"
  163. artifact_dir.mkdir(exist_ok=True)
  164. if artifact_type == "json":
  165. artifact_path = artifact_dir / f"{artifact_name}.json"
  166. with open(artifact_path, "w", encoding="utf-8") as f:
  167. json.dump(artifact_data, f, ensure_ascii=False, indent=2)
  168. elif artifact_type == "text":
  169. artifact_path = artifact_dir / f"{artifact_name}.txt"
  170. with open(artifact_path, "w", encoding="utf-8") as f:
  171. f.write(str(artifact_data))
  172. else:
  173. artifact_path = artifact_dir / artifact_name
  174. with open(artifact_path, "wb") as f:
  175. f.write(artifact_data)
  176. print(f" 📎 Artifact saved: {artifact_path.name}")
  177. return str(artifact_path)
  178. def finalize(self, final_status: str = "success", final_output: Any = None):
  179. """
  180. 完成整个运行,生成最终摘要
  181. Args:
  182. final_status: 最终状态
  183. final_output: 最终输出
  184. """
  185. self.metadata.update({
  186. "end_time": datetime.now().isoformat(),
  187. "status": final_status,
  188. "steps_count": self.step_counter,
  189. "final_output": self._serialize(final_output),
  190. })
  191. self._save_metadata()
  192. # 生成摘要
  193. self._generate_summary()
  194. print(f"\n{'='*60}")
  195. print(f"Run completed: {final_status}")
  196. print(f"Total steps: {self.step_counter}")
  197. print(f"Log directory: {self.log_dir}")
  198. print(f"{'='*60}")
  199. def _save_step(self, step_id: int, step_info: dict):
  200. """保存步骤信息"""
  201. step_file = self.steps_dir / f"step_{step_id:04d}.json"
  202. with open(step_file, "w", encoding="utf-8") as f:
  203. json.dump(step_info, f, ensure_ascii=False, indent=2)
  204. def _load_step(self, step_id: int) -> dict:
  205. """加载步骤信息"""
  206. step_file = self.steps_dir / f"step_{step_id:04d}.json"
  207. with open(step_file, "r", encoding="utf-8") as f:
  208. return json.load(f)
  209. def _save_timeline(self):
  210. """保存时间线"""
  211. timeline_file = self.timeline_dir / "timeline.json"
  212. with open(timeline_file, "w", encoding="utf-8") as f:
  213. json.dump(self.timeline, f, ensure_ascii=False, indent=2)
  214. def _save_metadata(self):
  215. """保存元数据"""
  216. metadata_file = self.log_dir / "metadata.json"
  217. with open(metadata_file, "w", encoding="utf-8") as f:
  218. json.dump(self.metadata, f, ensure_ascii=False, indent=2)
  219. def _serialize(self, data: Any) -> Any:
  220. """序列化数据(处理Pydantic模型等)"""
  221. if data is None:
  222. return None
  223. # 处理Pydantic模型
  224. if hasattr(data, "model_dump"):
  225. return data.model_dump()
  226. # 处理字典
  227. if isinstance(data, dict):
  228. return {k: self._serialize(v) for k, v in data.items()}
  229. # 处理列表
  230. if isinstance(data, list):
  231. return [self._serialize(item) for item in data]
  232. # 其他类型直接返回
  233. return data
  234. def _generate_summary(self):
  235. """生成运行摘要"""
  236. summary = {
  237. "run_id": self.run_id,
  238. "status": self.metadata["status"],
  239. "start_time": self.metadata["start_time"],
  240. "end_time": self.metadata["end_time"],
  241. "total_steps": self.step_counter,
  242. "steps_overview": [],
  243. }
  244. # 汇总所有步骤
  245. for step_id in range(1, self.step_counter + 1):
  246. step_info = self._load_step(step_id)
  247. summary["steps_overview"].append({
  248. "step_id": step_id,
  249. "step_name": step_info["step_name"],
  250. "step_type": step_info["step_type"],
  251. "status": step_info["status"],
  252. "duration_seconds": step_info["duration_seconds"],
  253. })
  254. # 保存摘要
  255. summary_file = self.log_dir / "summary.json"
  256. with open(summary_file, "w", encoding="utf-8") as f:
  257. json.dump(summary, f, ensure_ascii=False, indent=2)