structured_logger.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. """
  2. 结构化日志记录器
  3. 提供步骤化、可追溯、易于可视化的日志记录功能
  4. """
  5. import json
  6. import os
  7. from datetime import datetime
  8. from typing import Any, Optional
  9. from pathlib import Path
  10. class StructuredLogger:
  11. """
  12. 结构化日志记录器
  13. 特点:
  14. 1. 每个步骤独立保存文件
  15. 2. 记录完整的时间线
  16. 3. 支持嵌套步骤(树形结构)
  17. 4. 便于可视化和debug
  18. """
  19. def __init__(self, log_dir: str, run_id: str):
  20. """
  21. 初始化日志记录器
  22. Args:
  23. log_dir: 日志根目录
  24. run_id: 本次运行的唯一标识
  25. """
  26. self.log_dir = Path(log_dir)
  27. self.run_id = run_id
  28. # 创建目录结构
  29. self.steps_dir = self.log_dir / "steps"
  30. self.timeline_dir = self.log_dir / "timeline"
  31. self.artifacts_dir = self.log_dir / "artifacts"
  32. for dir_path in [self.steps_dir, self.timeline_dir, self.artifacts_dir]:
  33. dir_path.mkdir(parents=True, exist_ok=True)
  34. # 时间线记录
  35. self.timeline = []
  36. self.step_counter = 0
  37. self.step_stack = [] # 用于嵌套步骤
  38. # 初始化元数据
  39. self.metadata = {
  40. "run_id": run_id,
  41. "start_time": datetime.now().isoformat(),
  42. "status": "running",
  43. "steps_count": 0,
  44. "log_dir": str(self.log_dir),
  45. }
  46. self._save_metadata()
  47. def start_step(
  48. self,
  49. step_name: str,
  50. step_type: str,
  51. description: str = "",
  52. input_data: Any = None
  53. ) -> int:
  54. """
  55. 开始一个新步骤
  56. Args:
  57. step_name: 步骤名称(如:"extract_keywords", "explore_level_1")
  58. step_type: 步骤类型(如:"extraction", "exploration", "analysis", "evaluation")
  59. description: 步骤描述
  60. input_data: 输入数据
  61. Returns:
  62. step_id: 步骤ID
  63. """
  64. self.step_counter += 1
  65. step_id = self.step_counter
  66. # 计算层级(基于栈深度)
  67. level = len(self.step_stack)
  68. parent_id = self.step_stack[-1] if self.step_stack else None
  69. step_info = {
  70. "step_id": step_id,
  71. "step_name": step_name,
  72. "step_type": step_type,
  73. "description": description,
  74. "level": level,
  75. "parent_id": parent_id,
  76. "status": "running",
  77. "start_time": datetime.now().isoformat(),
  78. "end_time": None,
  79. "duration_seconds": None,
  80. "input": self._serialize(input_data),
  81. "output": None,
  82. "error": None,
  83. }
  84. # 压入栈
  85. self.step_stack.append(step_id)
  86. # 保存步骤开始信息
  87. self._save_step(step_id, step_info)
  88. # 添加到时间线
  89. self.timeline.append({
  90. "timestamp": step_info["start_time"],
  91. "event": "step_start",
  92. "step_id": step_id,
  93. "step_name": step_name,
  94. "step_type": step_type,
  95. })
  96. self._save_timeline()
  97. print(f"\n{' ' * level}[Step {step_id}] {step_name} - {description}")
  98. return step_id
  99. def end_step(
  100. self,
  101. step_id: int,
  102. output_data: Any = None,
  103. status: str = "success",
  104. error: Optional[str] = None
  105. ):
  106. """
  107. 结束一个步骤
  108. Args:
  109. step_id: 步骤ID
  110. output_data: 输出数据
  111. status: 步骤状态("success", "error", "skipped")
  112. error: 错误信息(如果有)
  113. """
  114. # 从栈中弹出
  115. if self.step_stack and self.step_stack[-1] == step_id:
  116. self.step_stack.pop()
  117. # 读取步骤信息
  118. step_info = self._load_step(step_id)
  119. # 更新步骤信息
  120. end_time = datetime.now()
  121. start_time = datetime.fromisoformat(step_info["start_time"])
  122. duration = (end_time - start_time).total_seconds()
  123. step_info.update({
  124. "status": status,
  125. "end_time": end_time.isoformat(),
  126. "duration_seconds": duration,
  127. "output": self._serialize(output_data),
  128. "error": error,
  129. })
  130. # 保存步骤结束信息
  131. self._save_step(step_id, step_info)
  132. # 添加到时间线
  133. self.timeline.append({
  134. "timestamp": step_info["end_time"],
  135. "event": "step_end",
  136. "step_id": step_id,
  137. "step_name": step_info["step_name"],
  138. "status": status,
  139. "duration_seconds": duration,
  140. })
  141. self._save_timeline()
  142. level = len(self.step_stack)
  143. status_emoji = "✅" if status == "success" else "❌" if status == "error" else "⏭️"
  144. print(f"{' ' * level}{status_emoji} [Step {step_id}] Completed in {duration:.2f}s")
  145. def log_artifact(
  146. self,
  147. step_id: int,
  148. artifact_name: str,
  149. artifact_data: Any,
  150. artifact_type: str = "json"
  151. ) -> str:
  152. """
  153. 保存步骤的关联产物(如:API响应、中间结果等)
  154. Args:
  155. step_id: 步骤ID
  156. artifact_name: 产物名称
  157. artifact_data: 产物数据
  158. artifact_type: 产物类型("json", "text", "image"等)
  159. Returns:
  160. artifact_path: 产物文件路径
  161. """
  162. artifact_dir = self.artifacts_dir / f"step_{step_id:04d}"
  163. artifact_dir.mkdir(exist_ok=True)
  164. if artifact_type == "json":
  165. artifact_path = artifact_dir / f"{artifact_name}.json"
  166. with open(artifact_path, "w", encoding="utf-8") as f:
  167. json.dump(artifact_data, f, ensure_ascii=False, indent=2)
  168. elif artifact_type == "text":
  169. artifact_path = artifact_dir / f"{artifact_name}.txt"
  170. with open(artifact_path, "w", encoding="utf-8") as f:
  171. f.write(str(artifact_data))
  172. else:
  173. artifact_path = artifact_dir / artifact_name
  174. with open(artifact_path, "wb") as f:
  175. f.write(artifact_data)
  176. print(f" 📎 Artifact saved: {artifact_path.name}")
  177. return str(artifact_path)
  178. def finalize(self, final_status: str = "success", final_output: Any = None):
  179. """
  180. 完成整个运行,生成最终摘要
  181. Args:
  182. final_status: 最终状态
  183. final_output: 最终输出
  184. """
  185. self.metadata.update({
  186. "end_time": datetime.now().isoformat(),
  187. "status": final_status,
  188. "steps_count": self.step_counter,
  189. "final_output": self._serialize(final_output),
  190. })
  191. self._save_metadata()
  192. # 生成摘要
  193. self._generate_summary()
  194. print(f"\n{'='*60}")
  195. print(f"Run completed: {final_status}")
  196. print(f"Total steps: {self.step_counter}")
  197. print(f"Log directory: {self.log_dir}")
  198. print(f"{'='*60}")
  199. def _save_step(self, step_id: int, step_info: dict):
  200. """保存步骤信息"""
  201. step_file = self.steps_dir / f"step_{step_id:04d}.json"
  202. with open(step_file, "w", encoding="utf-8") as f:
  203. json.dump(step_info, f, ensure_ascii=False, indent=2)
  204. def _load_step(self, step_id: int) -> dict:
  205. """加载步骤信息"""
  206. step_file = self.steps_dir / f"step_{step_id:04d}.json"
  207. with open(step_file, "r", encoding="utf-8") as f:
  208. return json.load(f)
  209. def _save_timeline(self):
  210. """保存时间线"""
  211. timeline_file = self.timeline_dir / "timeline.json"
  212. with open(timeline_file, "w", encoding="utf-8") as f:
  213. json.dump(self.timeline, f, ensure_ascii=False, indent=2)
  214. def _save_metadata(self):
  215. """保存元数据"""
  216. metadata_file = self.log_dir / "metadata.json"
  217. with open(metadata_file, "w", encoding="utf-8") as f:
  218. json.dump(self.metadata, f, ensure_ascii=False, indent=2)
  219. def _serialize(self, data: Any) -> Any:
  220. """序列化数据(处理Pydantic模型等)"""
  221. if data is None:
  222. return None
  223. # 处理Pydantic模型
  224. if hasattr(data, "model_dump"):
  225. return data.model_dump()
  226. # 处理字典
  227. if isinstance(data, dict):
  228. return {k: self._serialize(v) for k, v in data.items()}
  229. # 处理列表
  230. if isinstance(data, list):
  231. return [self._serialize(item) for item in data]
  232. # 其他类型直接返回
  233. return data
  234. def _generate_summary(self):
  235. """生成运行摘要"""
  236. summary = {
  237. "run_id": self.run_id,
  238. "status": self.metadata["status"],
  239. "start_time": self.metadata["start_time"],
  240. "end_time": self.metadata["end_time"],
  241. "total_steps": self.step_counter,
  242. "steps_overview": [],
  243. }
  244. # 汇总所有步骤
  245. for step_id in range(1, self.step_counter + 1):
  246. step_info = self._load_step(step_id)
  247. summary["steps_overview"].append({
  248. "step_id": step_id,
  249. "step_name": step_info["step_name"],
  250. "step_type": step_info["step_type"],
  251. "status": step_info["status"],
  252. "duration_seconds": step_info["duration_seconds"],
  253. })
  254. # 保存摘要
  255. summary_file = self.log_dir / "summary.json"
  256. with open(summary_file, "w", encoding="utf-8") as f:
  257. json.dump(summary, f, ensure_ascii=False, indent=2)